package cn.ant.mqHandle;

import cn.ant.config.RabbitMQConfig;
import cn.ant.entity.MessageSendDTO;
import com.alibaba.fastjson2.JSONObject;
import com.rabbitmq.client.Channel;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.core.ValueOperations;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeUnit;

/***
 * 消费者类
 * @author Anhui AntLaddie <a href="https://juejin.cn/user/4092805620185316">(掘金蚂蚁小哥)</a>
 * @version 1.0
 **/
@Slf4j
@Component
@RequiredArgsConstructor
public class QueueConsumer {

    // 注入StringRedisTemplate对象
    private final StringRedisTemplate redisTemplate;

    /***
     * 消费者（监听队列ordinaryQueue）
     * @param msgData 传递的具体消息内容，最好是生产者发送使用什么类型，这里接收就用什么类型
     * @param deliveryTag 处理消息的编号，一般在消息确认时要使用
     * @param message 这个就类似我们原生的message
     * @param channel 这个就类似我们原生的channel
     * 关于@RabbitListener：只需要监听队列即可,多个则在{}里面逗号分割；ackMode确认模式
     */
    @RabbitListener(queues = {RabbitMQConfig.ORDINARY_QUEUE}, ackMode = "MANUAL")
    public void ordinaryQueueConsumption(@Payload String msgData,
                                         @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag,
                                         Message message,
                                         Channel channel) throws IOException {
        // redis的键值操作对象
        ValueOperations<String, String> forValue = redisTemplate.opsForValue();
        // 获取到队列消息，因为发送是JSON格式，我们要解析对象格式
        // message.getBody()：存储消息的具体内容（序列化后的二进制数据）
        String msgJsonStr = new String(message.getBody(), StandardCharsets.UTF_8);
        MessageSendDTO msg = JSONObject.parseObject(msgJsonStr, MessageSendDTO.class);
        // Redis Key
        String key = "rabbit:consume:order:" + msg.getMsgID();
        try {
            // 判断消息有没有被消费过，没有消费过则设置（代表有消费者准备消费了）
            Boolean result = forValue.setIfAbsent(key, String.valueOf(msg.getMsgID()), 1, TimeUnit.DAYS);
            // 判断，若设置成功代表可以消费此条消息
            if (Boolean.TRUE.equals(result)) {
                log.info("A：消息由消费者A消费：{}，并消费完成", msg);
                // 手动确认，注：这个deliveryTag可以通过message.getMessageProperties().getDeliveryTag()拿到
                channel.basicAck(deliveryTag, false);
            } else {
                log.info("消费者当前消费的消息被别的消费者已经消费过了，或者正在消费：{}", msg);
                // 重复发送的也得手动确认掉，但是不处理
                channel.basicAck(deliveryTag, false);
            }
        } catch (Exception e) {
            // 若消费失败则删除之前的锁定（缓存）,下次队列投递给消费者的时候可以继续消费
            redisTemplate.delete(key);
        }
    }
}
